From c1290f9eedac8e555e2a531993f5c4290cda3467 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Sat, 28 Jun 2014 21:23:50 -0700 Subject: [PATCH] Add a task-pool abstraction The standard library also provides a task pool, but it's scheduling is not quite the desired semantics. This task pool instead has all workers contend on a shared queue to take work from rather than assigning new jobs to specific workers for forever. --- src/cargo/util/mod.rs | 2 ++ src/cargo/util/pool.rs | 61 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 src/cargo/util/pool.rs diff --git a/src/cargo/util/mod.rs b/src/cargo/util/mod.rs index 5cc0ccb34..6b26d9bc8 100644 --- a/src/cargo/util/mod.rs +++ b/src/cargo/util/mod.rs @@ -6,6 +6,7 @@ pub use self::errors::{CliError, FromError, ProcessError}; pub use self::errors::{process_error, internal_error, internal, human}; pub use self::paths::realpath; pub use self::hex::to_hex; +pub use self::pool::TaskPool; pub mod graph; pub mod process_builder; @@ -16,3 +17,4 @@ pub mod toml; pub mod paths; pub mod errors; pub mod hex; +mod pool; diff --git a/src/cargo/util/pool.rs b/src/cargo/util/pool.rs new file mode 100644 index 000000000..2b3697a41 --- /dev/null +++ b/src/cargo/util/pool.rs @@ -0,0 +1,61 @@ +//! A load-balancing task pool. +//! +//! This differs in implementation from std::sync::TaskPool in that each job is +//! up for grabs by any of the child tasks in the pool. +//! +//! This should be upstreamed at some point. + +use std::sync::{Arc, Mutex}; + +pub struct TaskPool { + state: Arc>, +} + +struct State { done: bool, jobs: Vec } + +impl TaskPool { + pub fn new(tasks: uint) -> TaskPool { + assert!(tasks > 0); + + let state = Arc::new(Mutex::new(State { + done: false, + jobs: Vec::new(), + })); + + for _ in range(0, tasks) { + let myjobs = state.clone(); + spawn(proc() worker(&*myjobs)); + } + + return TaskPool { state: state }; + + fn worker(mystate: &Mutex) { + let mut state = mystate.lock(); + while !state.done { + match state.jobs.pop() { + Some(job) => { + drop(state); + job(); + state = mystate.lock(); + } + None => state.cond.wait(), + } + } + } + } + + pub fn execute(&self, job: proc():Send) { + let mut state = self.state.lock(); + state.jobs.push(job); + state.cond.signal(); + } +} + +impl Drop for TaskPool { + fn drop(&mut self) { + let mut state = self.state.lock(); + state.done = true; + state.cond.broadcast(); + drop(state); + } +} -- 2.30.2